api,coordinator: add drain API orchestration#4762
api,coordinator: add drain API orchestration#4762hongyunyan wants to merge 3 commits intosplit/pr-4190-2c-coordinator-drain-runtimefrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements the drainCapture API by introducing an epoch-based drain session management system in the coordinator. It adds mechanisms for broadcasting drain targets, tracking changefeed-level migration progress, and ensuring clean session termination through heartbeat acknowledgments. The review feedback highlights opportunities to improve diagnostic logging when a drain is blocked and to optimize the performance of changefeed status aggregation in large-scale environments.
coordinator/controller_drain.go
Outdated
| if observation.remaining > 0 { | ||
| log.Info("drain completion blocked by remaining work", | ||
| zap.Stringer("targetNodeID", target), | ||
| zap.Uint64("targetEpoch", targetEpoch), | ||
| zap.Int("maintainersOnTarget", observation.maintainersOnTarget), | ||
| zap.Int("inflightOpsInvolvingTarget", observation.inflightOpsInvolvingTarget), | ||
| zap.Int("dispatcherCountOnTarget", observation.dispatcherCountOnTarget), | ||
| zap.Int("targetInflightDrainMoveCount", observation.targetInflightDrainMoveCount), | ||
| zap.Int("pendingStatusCount", observation.pendingStatusCount), | ||
| zap.Int("remaining", observation.remaining)) | ||
| } |
There was a problem hiding this comment.
The log message only triggers when observation.remaining > 0. However, DrainNode can return 1 even when remaining is 0 if other completion signals (like stoppingObserved) are false. This makes it difficult to diagnose why a drain is stuck at 1 from the logs. Consider logging the full observation state whenever !isDrainCompletionProven.
if !isDrainCompletionProven(
observation.nodeState,
observation.drainingObserved,
observation.stoppingObserved,
observation.remaining,
) {
log.Info("drain completion blocked",
zap.Stringer("targetNodeID", target),
zap.Uint64("targetEpoch", targetEpoch),
zap.Stringer("nodeState", observation.nodeState),
zap.Bool("drainingObserved", observation.drainingObserved),
zap.Bool("stoppingObserved", observation.stoppingObserved),
zap.Int("maintainersOnTarget", observation.maintainersOnTarget),
zap.Int("inflightOpsInvolvingTarget", observation.inflightOpsInvolvingTarget),
zap.Int("dispatcherCountOnTarget", observation.dispatcherCountOnTarget),
zap.Int("targetInflightDrainMoveCount", observation.targetInflightDrainMoveCount),
zap.Int("pendingStatusCount", observation.pendingStatusCount),
zap.Int("remaining", observation.remaining))
}
coordinator/controller_drain.go
Outdated
| } | ||
|
|
||
| targetID := target.String() | ||
| cfs := c.changefeedDB.GetReplicating() |
There was a problem hiding this comment.
Calling c.changefeedDB.GetReplicating() inside aggregateDrainTargetProgress results in an DrainNode API. While TiCDC changefeed counts are typically manageable in memory, this could become a performance bottleneck or increase GC pressure if the API is polled frequently in clusters with thousands of changefeeds. Consider if this progress can be tracked incrementally or cached.
What problem does this PR solve?
The remaining part of #4190 is the public drain API and the completion orchestration on top of the runtime layers from #4759, #4760, and #4761. This PR extracts that final layer so reviewers can focus on API semantics, single-session orchestration, and the remaining-work contract without re-reviewing the lower-level protocol and scheduler changes.
Issue Number: ref #4190
What is changed and how it works?
Background:
Motivation:
Summary:
PUT /api/v1/captures/drainimplementation with coordinator-backed drain requestsdrainSessionanddrainClearStateorchestration in coordinatorHow it works:
Validation note:
go test ./coordinatorin the current environment still hits a fixed-port conflict because*:28300is already occupied, but the targeted drain tests for this PR passCheck List
Tests
go test ./api/v1 ./coordinator/operator ./pkg/servergo test -run 'TestDrain|TestDispatcherDrain|TestSetDrain|Test.*Drain' ./coordinatorQuestions
Will it cause performance regression or break compatibility?
This PR intentionally changes the v1 drain endpoint from a stub to real coordinator-backed behavior. The implementation keeps the completion check conservative so the API does not return zero remaining until drain completion is proven.
Do you need to update user documentation, design documentation or monitoring documentation?
No additional design or monitoring document changes are needed for this split. The implementation follows the approved drain-capture split design and existing drain design docs.
Release note